[CDK]Step Functions StateMachineの実行履歴を自動でAthenaに連携する
はじめに
コンサルティング部の神野です。
Step Functions StateMachineの実行結果を一覧でまとめたり分析したいと思ったりしたことはありませんか?
CloudWatchや該当StateMachineのコンソール画面でもある程度は確認することができますが、
全StateMachineの実行履歴を1テーブルに集約して蓄積及びBIツールなどで可視化して実行結果を分析したいといったケースもあるかと思います。
その可視化する前段として、Lambdaを活用してStateMachineの実行結果履歴をAthena(S3)に連携していきたいと思います!
前提
24時間以内に実行されたStateMachineの実行履歴を連携する簡易的なシステムの構築を進めていきます。
- EventBridge Schedulerで深夜0時に定期バッチに見立てたStateMachineを実行
- 深夜3時にStateMachineの実行履歴をAthenaに連携するLambda関数を実行
- 深夜3時に24時間以内に実行されたStateMachineの実行履歴を取得するLambda関数をEventBridge Schedulerで実行
- 取得した実行履歴をAthenaに連携
※データの実態はS3に格納される
構築
今回はCDK(TypeScript)を使って構築していきます。
TypeScriptで記載したLambda関数をトランスパイルせずそのままデプロイできるので採用しました。(CDK側でトランスパイル実行)
前提
CDK(TypeScript)を使用するため事前にNode.js及びCDKのインストールが必要です。もしインストールしていなければそれぞれインストールしておきます。
下記コマンドを実行してプロジェクトを作成します。
cdk init --language typescript
バージョン一覧
- Node.js 20.16.0
- aws-cdk 2.151.0
- @aws-cdk/aws-glue-alpha 2.151.0-alpha.0
- @aws-cdk/aws-scheduler-alpha 2.151.0-alpha.0
- @aws-cdk/aws-scheduler-targets-alpha 2.151.0-alpha.0
ライブラリインストール
Glue Database
、EventBridge Scheduler
などのL2 Constructを使うため一部アルファバージョンをインストールしています。
npm install @aws-cdk/aws-glue-alpha @aws-cdk/aws-scheduler-alpha @aws-cdk/aws-scheduler-targets-alpha
全体
今回は1つのStackcdk_state_machines_metrics-stack.ts
に全てコードを記載していきます。
コードは詳細に説明していきます。
// 必要なモジュールをインポート
import * as cdk from "aws-cdk-lib";
import { aws_lambda_nodejs, TimeZone } from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as athena from "aws-cdk-lib/aws-athena";
import * as iam from "aws-cdk-lib/aws-iam";
import * as glue from "@aws-cdk/aws-glue-alpha";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as path from "path";
import { Construct } from "constructs";
import * as aws_scheduler_alpha from "@aws-cdk/aws-scheduler-alpha";
import * as aws_scheduler_targets_alpha from "@aws-cdk/aws-scheduler-targets-alpha";
// CDKスタッククラスを定義
export class CdkStepfunctionsMetricStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// メトリクスとAthenaクエリ結果を保存するS3バケットを作成
const bucket = new s3.Bucket(this, "StateMachineMetricsBucket", {
bucketName: "<your-bucket-name>",
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// Athenaワークグループを作成
const workgroup = new athena.CfnWorkGroup(
this,
"StateMachinesMetricsWorkgroup",
{
name: "StateMachinesMetricsWorkgroup",
recursiveDeleteOption: true,
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${bucket.bucketName}/athena-results/`,
},
publishCloudWatchMetricsEnabled: true,
enforceWorkGroupConfiguration: true,
bytesScannedCutoffPerQuery: 1073741824,
},
}
);
// Glueデータベースを作成
const database = new glue.Database(this, "StateMachinesMetricsDatabase", {
databaseName: "statemachines_metrics_db",
});
// Glueテーブルを作成(Athenaでクエリ可能)
const table = new glue.S3Table(this, "StateMachinesMetricsTable", {
database: database,
tableName: "statemachines_metrics",
columns: [
{ name: "name", type: glue.Schema.STRING },
{ name: "date", type: glue.Schema.STRING },
{ name: "starttime", type: glue.Schema.STRING },
{ name: "endtime", type: glue.Schema.STRING },
{ name: "duration", type: glue.Schema.DOUBLE },
{ name: "status", type: glue.Schema.STRING },
],
dataFormat: glue.DataFormat.JSON,
bucket: bucket,
s3Prefix: "statemachines_metrics/",
});
// Lambda関数用のIAMロールを作成
const lambdaRole = new iam.Role(this, "StateMachinesMetricsLambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
});
// Lambda実行に必要な基本的な権限を追加
lambdaRole.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
)
);
// 必要なAWSサービスへのアクセス権限を追加
lambdaRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"athena:StartQueryExecution",
"states:GetExecutionHistory",
"states:DescribeExecution",
"states:ListStateMachines",
"states:ListExecutions",
],
resources: ["*"],
})
);
// Glue Data Catalogへのアクセス権限を追加
lambdaRole.addToPolicy(
new iam.PolicyStatement({
actions: ["glue:GetTable", "glue:GetPartitions"],
resources: [
"arn:aws:glue:*:*:catalog",
`arn:aws:glue:*:*:database/${database.databaseName}`,
`arn:aws:glue:*:*:table/${database.databaseName}/${table.tableName}`,
],
})
);
// Lambda関数を作成
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
this,
"StateMachinesMetricsLambda",
{
runtime: lambda.Runtime.NODEJS_20_X,
handler: "handler",
entry: path.join(__dirname, "../lambda/index.ts"),
role: lambdaRole,
timeout: cdk.Duration.minutes(5),
environment: {
BUCKET_NAME: bucket.bucketName,
ATHENA_WORKGROUP: workgroup.name,
DATABASE_NAME: database.databaseName,
TABLE_NAME: table.tableName,
},
}
);
// Lambda関数にS3バケットへの読み書き権限を付与
bucket.grantReadWrite(lambdaFunction);
// Lambda関数を実行するためのターゲットを作成
const target = new aws_scheduler_targets_alpha.LambdaInvoke(
lambdaFunction,
{}
);
// Lambda関数を定期的に実行するスケジュールを作成
new aws_scheduler_alpha.Schedule(this, "Schedule", {
scheduleName: "invoke-lambda-schedule",
schedule: aws_scheduler_alpha.ScheduleExpression.cron({
minute: "0",
hour: "3",
day: "*",
month: "*",
year: "*",
timeZone: TimeZone.ASIA_TOKYO,
}),
target,
});
// 5つのStateMachineを作成
this.createMultipleStateMachines(5);
}
// 複数のStateMachineを作成するメソッド
private createMultipleStateMachines(count: number) {
for (let i = 1; i <= count; i++) {
// ランダムで5-300秒の待機時間を生成
const waitTime = this.generateRandomWaitTime(5, 300);
// StateMachineの定義を作成
const definition = sfn.Chain.start(
new sfn.Wait(this, `Wait${i}`, {
time: sfn.WaitTime.duration(cdk.Duration.seconds(waitTime)),
})
).next(new sfn.Succeed(this, `Success${i}`));
// StateMachineを作成
const stateMachine = new sfn.StateMachine(this, `StateMachine${i}`, {
definition,
stateMachineName: `TestStateMachine${i}`,
timeout: cdk.Duration.minutes(5),
});
// 各StateMachine用のスケジューラーを作成
this.createSchedulerForStateMachine(stateMachine, i);
}
}
// 指定された範囲でランダムな秒数を生成するメソッド
private generateRandomWaitTime(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1) + min);
}
// StateMachineを実行するEventBridge Schedulerを設定するメソッド
private createSchedulerForStateMachine(
stateMachine: sfn.StateMachine,
index: number
) {
const target = new aws_scheduler_targets_alpha.StepFunctionsStartExecution(
stateMachine,
{}
);
new aws_scheduler_alpha.Schedule(this, `Schedule-StateMachine${index}`, {
scheduleName: `invoke-StateMachine${index}-schedule`,
schedule: aws_scheduler_alpha.ScheduleExpression.cron({
minute: "0",
hour: "0",
day: "*",
month: "*",
year: "*",
timeZone: TimeZone.ASIA_TOKYO,
}),
target,
});
}
}
S3
データ格納およびAthenaの実行結果格納用のバケットを作成します。
// メトリクスとAthenaクエリ結果を保存するS3バケットを作成
const bucket = new s3.Bucket(this, "StateMachineMetricsBucket", {
bucketName: "<your-bucket-name>",
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
Athena
下記リソースを作成していきます。
- WorkGroup
StateMachinesMetricsWorkgroup
- クエリ実行結果を
s3://${bucket.bucketName}/athena-results/
に格納するよう設定
- クエリ実行結果を
- Database
statemachines_metrics_db
- S3Table
statemachines_metrics
- 格納するデータはS3のPrefixを
statemacines_metrics/
に設定
// Athenaワークグループを作成
const workgroup = new athena.CfnWorkGroup(
this,
"StateMachinesMetricsWorkgroup",
{
name: "StateMachinesMetricsWorkgroup",
recursiveDeleteOption: true,
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${bucket.bucketName}/athena-results/`,
},
publishCloudWatchMetricsEnabled: true,
enforceWorkGroupConfiguration: true,
bytesScannedCutoffPerQuery: 1073741824,
},
}
);
// Glueデータベースを作成
const database = new glue.Database(this, "StateMachinesMetricsDatabase", {
databaseName: "statemachines_metrics_db",
});
// Glueテーブルを作成(Athenaでクエリ可能)
const table = new glue.S3Table(this, "StateMachinesMetricsTable", {
database: database,
tableName: "statemachines_metrics",
columns: [
{ name: "name", type: glue.Schema.STRING },
{ name: "date", type: glue.Schema.STRING },
{ name: "starttime", type: glue.Schema.STRING },
{ name: "endtime", type: glue.Schema.STRING },
{ name: "duration", type: glue.Schema.DOUBLE },
{ name: "status", type: glue.Schema.STRING },
],
dataFormat: glue.DataFormat.JSON,
bucket: bucket,
s3Prefix: "statemachines_metrics/",
});
作成するテーブルはStateMachineの実行履歴を格納するテーブルで各カラムの役割は下記の通りです。
テーブルのカラム一覧
カラム名 | 説明 | データ型 |
---|---|---|
name | 実行したStateMachine名 | String |
date | 実行日 | String |
starttime | 実行の開始時間 | String |
endtime | 実行の終了時間 | String |
duration | 実行時間(秒) | Double |
status | ステータス | String |
Lambda + EventBridge Scheduler
下記リソースを作成します。
- Lambdaで使用するIAMロール
下記権限を付与- StateMachineの実行履歴を取得できる権限
- Glue Data Catalogへのアクセス権限
- Athenaのクエリ実行権限
- データ格納用バケットへの参照・書き込み権限
- Lambda
- StateMachineから結果を取得し、Athena(S3)にデータを連携する処理
lambda
フォルダに格納したソースコードを指定- 環境変数の設定
- S3バケット名
- 使用するAthenaのWorkGroup名
- 使用するデータベース名
- 使用するテーブル名
- EventBridge Scheduler
- 毎日深夜3時にLambda関数を実行するよう設定
// Lambda関数用のIAMロールを作成
const lambdaRole = new iam.Role(this, "StateMachinesMetricsLambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
});
// Lambda実行に必要な基本的な権限を追加
lambdaRole.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
)
);
// 必要なAWSサービスへのアクセス権限を追加
lambdaRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"athena:StartQueryExecution",
"states:GetExecutionHistory",
"states:DescribeExecution",
"states:ListStateMachines",
"states:ListExecutions",
],
resources: ["*"],
})
);
// Glue Data Catalogへのアクセス権限を追加
lambdaRole.addToPolicy(
new iam.PolicyStatement({
actions: ["glue:GetTable", "glue:GetPartitions"],
resources: [
"arn:aws:glue:*:*:catalog",
`arn:aws:glue:*:*:database/${database.databaseName}`,
`arn:aws:glue:*:*:table/${database.databaseName}/${table.tableName}`,
],
})
);
// Lambda関数を作成
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
this,
"StateMachinesMetricsLambda",
{
runtime: lambda.Runtime.NODEJS_20_X,
handler: "handler",
entry: path.join(__dirname, "../lambda/index.ts"),
role: lambdaRole,
timeout: cdk.Duration.minutes(5),
environment: {
BUCKET_NAME: bucket.bucketName,
ATHENA_WORKGROUP: workgroup.name,
DATABASE_NAME: database.databaseName,
TABLE_NAME: table.tableName,
},
}
);
// Lambda関数にS3バケットへの読み書き権限を付与
bucket.grantReadWrite(lambdaFunction);
// Lambda関数を実行するためのターゲットを作成
const target = new aws_scheduler_targets_alpha.LambdaInvoke(
lambdaFunction,
{}
);
// Lambda関数を定期的に実行するスケジュールを作成
new aws_scheduler_alpha.Schedule(this, "Schedule", {
scheduleName: "invoke-lambda-schedule",
schedule: aws_scheduler_alpha.ScheduleExpression.cron({
minute: "0",
hour: "3",
day: "*",
month: "*",
year: "*",
timeZone: TimeZone.ASIA_TOKYO,
}),
target,
});
StateMachine + EventBridge Scheduler
下記リソースを作成します。
- StateMachine
- StateMachineは大量に作成できるよう別関数に切り出して、引数で指定した数分StateMachineを作成
- 指定の秒数を待って完了するといったシンプルな処理を作成
- EventBridge Scheduler
- 定期実行で深夜0時に作成したStateMachineを実行するよう設定
export class CdkStateMachinesMetricsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// 省略
// 5つのStateMachineを作成
this.createMultipleStateMachines(5);
}
private createMultipleStateMachines(count: number) {
for (let i = 1; i <= count; i++) {
// ランダムで5-300秒を取得する
const waitTime = this.generateRandomWaitTime(5, 300);
const definition = sfn.Chain.start(
new sfn.Wait(this, `Wait${i}`, {
time: sfn.WaitTime.duration(cdk.Duration.seconds(waitTime)),
})
).next(new sfn.Succeed(this, `Success${i}`));
const stateMachine = new sfn.StateMachine(this, `StateMachine${i}`, {
definition,
stateMachineName: `TestStateMachine${i}`,
timeout: cdk.Duration.minutes(5),
});
// Create a scheduler for each StateMachine
this.createSchedulerForStateMachine(stateMachine, i);
}
}
// 指定された範囲でランダムな秒数を取得する関数
private generateRandomWaitTime(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1) + min);
}
// StateMachineを実行するEventBridge Schedulerを設定する関数
private createSchedulerForStateMachine(
stateMachine: sfn.StateMachine,
index: number
) {
const target = new aws_scheduler_targets_alpha.StepFunctionsStartExecution(
stateMachine,
{}
);
new aws_scheduler_alpha.Schedule(this, `Schedule-StateMachine${index}`, {
scheduleName: `invoke-StateMachine${index}-schedule`,
schedule: aws_scheduler_alpha.ScheduleExpression.cron({
minute: "0",
hour: "0",
day: "*",
month: "*",
year: "*",
timeZone: TimeZone.ASIA_TOKYO,
}),
target,
});
}
}
Lambdaの処理
プロジェクトフォルダー直下にlambda
フォルダを作成し、フォルダ内にindex.ts
を作成して処理を記載します。
準備
-
npm init
コマンド実行でプロジェクトの作成実行コマンドnpm init # 諸々と聞かれるが、特段何も変更する箇所がなければ全てyes
-
日付の時間操作を行うので下記ライブラリをインストールしておきます。
- date-fns
実行コマンドnpm install date-fns
処理詳細
作成するLambda関数は以下の手順で、StateMachineの実行履歴を取得し、Athenaのテーブルにデータを挿入します。
- StateMachine一覧の取得と実行履歴の収集
- AWS SDKの
StepFunctions
クライアントを使用してlistStateMachines
APIを呼び出し、環境内の全StateMachineの一覧を取得 - 各StateMachineに対して
listExecutions
APIを呼び出し、過去24時間以内の実行結果を取得
※毎日連携するため、24時間以内で実行されたものに限定 RUNNING
状態の実行結果は除外し、完了した実行のみを対象- ページネーションを使用して、100件ずつ実行履歴を取得
- AWS SDKの
- データの整形とAthenaクエリの生成
- 取得した実行結果を、Athenaテーブルのスキーマ(name, date, starttime, endtime, duration, status)に合わせて整形
date-fns
ライブラリを使用して、日付と時間をフォーマットし、UTCからJSTに変換- 整形されたデータを使用して、環境変数から取得したデータベース名とテーブル名を用いて
INSERT
するSQLクエリを生成
- Athenaクエリの実行とエラーハンドリング
- AWS SDKの
AthenaClient
を使用して、生成したクエリをstartQueryExecution
APIで非同期に実行 - クエリ実行時に、環境変数から取得したワークグループ名とS3出力先を指定
- try-catchブロックを使用してエラーハンドリングを行い、エラーが発生した場合はコンソールにログを出力
- AWS SDKの
- 実行結果のログ出力
- 処理完了時に、成功メッセージをレスポンスとして返却
- エラーが発生した場合は、500エラーとエラーメッセージを返却
import { StepFunctions, Athena } from "aws-sdk";
import { APIGatewayProxyResult } from "aws-lambda";
import { format, addHours } from "date-fns";
// AWS SDKのクライアントを初期化
const sfn = new StepFunctions();
const athena = new Athena();
// 環境変数から必要な値を取得
const BUCKET_NAME = process.env.BUCKET_NAME;
const ATHENA_WORKGROUP = process.env.ATHENA_WORKGROUP;
const DATABASE_NAME = process.env.DATABASE_NAME;
const TABLE_NAME = process.env.TABLE_NAME;
// StateMachine実行結果の型定義
interface ExecutionResult {
StateMachineName: string;
StartTime: string;
Date: string;
EndTime: string;
Duration: number;
Status: string;
}
// Lambda関数のメインハンドラー
export const handler = async (event: any): Promise<APIGatewayProxyResult> => {
try {
// 全StateMachineの実行結果を取得
const results: ExecutionResult[] = await getAllExecutions();
// 実行結果がない場合は処理をスキップ
if (results.length === 0)
return {
statusCode: 200,
body: JSON.stringify("Skip no data"),
};
// Athenaクエリを生成
const query = buildAthenaQuery(results);
// Athenaクエリを実行
await executeAthenaQuery(query);
// 成功レスポンスを返す
return {
statusCode: 200,
body: JSON.stringify(
"Daily State Machines execution summary inserted into Athena table"
),
};
} catch (error) {
// エラーハンドリング
console.error("Error:", error);
return {
statusCode: 500,
body: JSON.stringify("An error occurred while processing the request"),
};
}
};
// 全StateMachineの実行結果を取得する関数
async function getAllExecutions(): Promise<ExecutionResult[]> {
const stateMachines = await getStateMachines();
let allResults: ExecutionResult[] = [];
for (const stateMachineArn of stateMachines) {
const executions = await listExecutions(stateMachineArn);
allResults = allResults.concat(executions);
}
return allResults;
}
// 全StateMachineのARNを取得する関数
async function getStateMachines(): Promise<string[]> {
const response = await sfn.listStateMachines().promise();
return (
response.stateMachines?.map(
(sm: StepFunctions.StateMachineListItem) => sm.stateMachineArn ?? ""
) || []
);
}
// UTCからJSTに変換する関数
function toJST(date: Date): Date {
return addHours(date, 9);
}
// 日付をフォーマットする関数
function formatDate(date: Date): string {
return format(toJST(date), "yyyy-MM-dd HH:mm:ss");
}
// 特定のStateMachineの実行結果を取得する関数
async function listExecutions(
stateMachineArn: string
): Promise<ExecutionResult[]> {
const results: ExecutionResult[] = [];
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
let nextToken: string | undefined;
do {
// StateMachineの実行履歴を取得
const response = await sfn
.listExecutions({
stateMachineArn,
maxResults: 100,
nextToken,
})
.promise();
if (response.executions) {
for (const execution of response.executions) {
// 24時間以内の完了した実行のみを対象とする
if (
execution.startDate >= twentyFourHoursAgo &&
execution.status !== "RUNNING"
) {
const endTime = execution.stopDate ? execution.stopDate : new Date();
results.push({
StateMachineName: stateMachineArn.split(":").pop() || "",
StartTime: formatDate(execution.startDate),
Date: format(toJST(execution.startDate), "yyyy-MM-dd"),
Duration: (endTime.getTime() - execution.startDate.getTime()) / 1000,
EndTime: formatDate(endTime),
Status: execution.status,
});
} else if (execution.startDate < twentyFourHoursAgo) {
// 24時間以上前の実行は対象外なのでループを終了
nextToken = undefined;
break;
}
}
}
nextToken = response.nextToken;
} while (nextToken);
return results;
}
// Athenaクエリを生成する関数
function buildAthenaQuery(results: ExecutionResult[]): string {
const tableName = TABLE_NAME;
const databaseName = DATABASE_NAME;
// INSERTクエリの基本構造を作成
let query = `
INSERT INTO ${databaseName}.${tableName}
SELECT
name,
date,
starttime,
endtime,
duration,
status
FROM (
`;
// 各実行結果をUNION ALLで結合
for (let i = 0; i < results.length; i++) {
const result = results[i];
query += `
SELECT
'${result.StateMachineName}' as name,
'${result.Date}' as date,
'${result.StartTime}' as starttime,
'${result.EndTime}' as endtime,
${result.Duration} as duration,
'${result.Status}' as status
`;
if (i < results.length - 1) {
query += "UNION ALL";
}
}
query += ")";
console.log(query);
return query;
}
// Athenaクエリを実行する関数
async function executeAthenaQuery(query: string): Promise<void> {
const params: Athena.StartQueryExecutionInput = {
QueryString: query,
QueryExecutionContext: {
Database: DATABASE_NAME,
},
ResultConfiguration: {
OutputLocation: `s3://${BUCKET_NAME}/athena-results/`,
},
WorkGroup: ATHENA_WORKGROUP,
};
// クエリを非同期で実行
await athena.startQueryExecution(params).promise();
}
この処理により、環境内の全StateMachineの直近24時間の実行履歴が自動的にAthenaテーブルに格納され、後続の分析や可視化に利用できるようになります。
環境作成
deploy
コマンドを実行して、環境を作成します。
cdk deploy
動作確認
StateMachineはEventBridge Schedulerで0時に起動、Lambda関数は3時に起動してデータが連携されているので、Athenaで下記クエリを実行して連携されたデータを確認してみます。下記を使用してクエリを実行します。
- CDKで作成したワークグループ
StateMachinesMetricsWorkgroup
を選択 - CDKで作成したデータベース
statemachines_metrics_db
を選択 - CDKで作成したテーブル
statemachines_metrics
へSQL実行
select *
from statemachines_metrics
実行結果
実行結果
クエリを実行すると問題なく連携されていますね!
おわりに
StateMachineの実行履歴をAthenaに連携する方法はいかがだったでしょうか?
データの蓄積やBIツールでの分析などを可能にするための一歩として参考になりましたら幸いです。